package cn.icanci.loopstack.rec.engine.sdk.server;

import cn.hutool.json.JSONUtil;
import cn.icanci.loopstack.rec.engine.sdk.rule.repository.EngineRepositoryHolder;
import cn.icanci.loopstack.rec.common.model.socket.PublishDTO;
import cn.icanci.loopstack.rec.common.model.socket.SocketMessage;
import cn.icanci.loopstack.rec.common.model.socket.UriConstant;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import io.netty.util.internal.ThrowableUtil;

import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * REC的网络交互参考了xxl-job的设计
 * - xxl-job的网络设计比较简洁，此处就没有造轮子了
 * 
 * @author icanci
 * @since 1.0 Created in 2022/11/20 18:22
 */
public class RecNettyServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

    private static final Logger           logger = LoggerFactory.getLogger(RecNettyServerHandler.class);

    private static EngineRepositoryHolder engineRepositoryHolder;

    private final ThreadPoolExecutor      pool;

    public RecNettyServerHandler(ThreadPoolExecutor pool) {
        this.pool = pool;
    }

    public static void setEngineRepositoryHolder(EngineRepositoryHolder engineRepositoryHolder) {
        RecNettyServerHandler.engineRepositoryHolder = engineRepositoryHolder;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
        String requestData = msg.content().toString(CharsetUtil.UTF_8);
        String uri = msg.uri();
        HttpMethod httpMethod = msg.method();
        boolean keepAlive = HttpUtil.isKeepAlive(msg);
        // 对于这种配置数据，会有很频繁的变更
        pool.execute(() -> {
            Object responseObj = process(httpMethod, uri, requestData);

            String responseJson = JSONUtil.toJsonStr(responseObj);

            writeResponse(ctx, keepAlive, responseJson);
        });
    }

    private Object process(HttpMethod httpMethod, String uri, String requestData) {
        if (HttpMethod.POST != httpMethod) {
            return SocketMessage.fail("Only post requests are supported");
        }
        if (StringUtils.isBlank(uri)) {
            return SocketMessage.fail("Request uri is null");
        }
        try {
            switch (uri) {
                case UriConstant.HEARTBEAT:
                    logger.info("[{}][RecNettyClientHandler][process] heartbeat", Thread.currentThread().getName());
                    return SocketMessage.success();
                case UriConstant.REFRESH:
                    PublishDTO publish = JSONUtil.toBean(requestData, PublishDTO.class);
                    Set<String> domainCodes = publish.getDomainCodes();

                    if (CollectionUtils.isEmpty(domainCodes)) {
                        return SocketMessage.fail("The domain code list to refresh is empty");
                    }
                    engineRepositoryHolder.refresh(domainCodes);
                    logger.info("[{}][RecNettyClientHandler][process] domainCodes:{} was refreshed!", Thread.currentThread().getName(), domainCodes);
                    return SocketMessage.success();
                default:
                    return SocketMessage.fail("Invalid request, uri-mapping(" + uri + ") not found");
            }
        } catch (Exception e) {
            return SocketMessage.fail(ThrowableUtil.stackTraceToString(e));
        }
    }

    private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8));
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8");
        response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
        if (keepAlive) {
            response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
        }
        ctx.writeAndFlush(response);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        logger.error(ThrowableUtil.stackTraceToString(cause));
        ctx.close();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            ctx.channel().close();
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}
